/*-------------------------------------------------------------------------
 *
 * amutils.c
 *      SQL-level APIs related to index access methods.
 *
 * Copyright (c) 2016-2017, PostgreSQL Global Development Group
 *
 *
 * IDENTIFICATION
 *      src/backend/utils/adt/amutils.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/amapi.h"
#include "access/htup_details.h"
#include "catalog/pg_class.h"
#include "catalog/pg_index.h"
#include "utils/builtins.h"
#include "utils/syscache.h"


/* Convert string property name to enum, for efficiency */
struct am_propname
{
    const char *name;
    IndexAMProperty prop;
};

static const struct am_propname am_propnames[] =
{
    {
        "asc", AMPROP_ASC
    },
    {
        "desc", AMPROP_DESC
    },
    {
        "nulls_first", AMPROP_NULLS_FIRST
    },
    {
        "nulls_last", AMPROP_NULLS_LAST
    },
    {
        "orderable", AMPROP_ORDERABLE
    },
    {
        "distance_orderable", AMPROP_DISTANCE_ORDERABLE
    },
    {
        "returnable", AMPROP_RETURNABLE
    },
    {
        "search_array", AMPROP_SEARCH_ARRAY
    },
    {
        "search_nulls", AMPROP_SEARCH_NULLS
    },
    {
        "clusterable", AMPROP_CLUSTERABLE
    },
    {
        "index_scan", AMPROP_INDEX_SCAN
    },
    {
        "bitmap_scan", AMPROP_BITMAP_SCAN
    },
    {
        "backward_scan", AMPROP_BACKWARD_SCAN
    },
    {
        "can_order", AMPROP_CAN_ORDER
    },
    {
        "can_unique", AMPROP_CAN_UNIQUE
    },
    {
        "can_multi_col", AMPROP_CAN_MULTI_COL
    },
    {
        "can_exclude", AMPROP_CAN_EXCLUDE
    }
};

static IndexAMProperty
lookup_prop_name(const char *name)
{
    int            i;

    for (i = 0; i < lengthof(am_propnames); i++)
    {
        if (pg_strcasecmp(am_propnames[i].name, name) == 0)
            return am_propnames[i].prop;
    }

    /* We do not throw an error, so that AMs can define their own properties */
    return AMPROP_UNKNOWN;
}

/*
 * Common code for properties that are just bit tests of indoptions.
 *
 * relid/attno: identify the index column to test the indoptions of.
 * guard: if false, a boolean false result is forced (saves code in caller).
 * iopt_mask: mask for interesting indoption bit.
 * iopt_expect: value for a "true" result (should be 0 or iopt_mask).
 *
 * Returns false to indicate a NULL result (for "unknown/inapplicable"),
 * otherwise sets *res to the boolean value to return.
 */
static bool
test_indoption(Oid relid, int attno, bool guard,
               int16 iopt_mask, int16 iopt_expect,
               bool *res)
{
    HeapTuple    tuple;
    Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
    Datum        datum;
    bool        isnull;
    int2vector *indoption;
    int16        indoption_val;

    if (!guard)
    {
        *res = false;
        return true;
    }

    tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(relid));
    if (!HeapTupleIsValid(tuple))
        return false;
    rd_index = (Form_pg_index) GETSTRUCT(tuple);

    Assert(relid == rd_index->indexrelid);
    Assert(attno > 0 && attno <= rd_index->indnatts);

    datum = SysCacheGetAttr(INDEXRELID, tuple,
                            Anum_pg_index_indoption, &isnull);
    Assert(!isnull);

    indoption = ((int2vector *) DatumGetPointer(datum));
    indoption_val = indoption->values[attno - 1];

    *res = (indoption_val & iopt_mask) == iopt_expect;

    ReleaseSysCache(tuple);

    return true;
}


/*
 * Test property of an index AM, index, or index column.
 *
 * This is common code for different SQL-level funcs, so the amoid and
 * index_oid parameters are mutually exclusive; we look up the amoid from the
 * index_oid if needed, or if no index oid is given, we're looking at AM-wide
 * properties.
 */
static Datum
indexam_property(FunctionCallInfo fcinfo,
                 const char *propname,
                 Oid amoid, Oid index_oid, int attno)
{// #lizard forgives
    bool        res = false;
    bool        isnull = false;
    int            natts = 0;
    IndexAMProperty prop;
    IndexAmRoutine *routine;

    /* Try to convert property name to enum (no error if not known) */
    prop = lookup_prop_name(propname);

    /* If we have an index OID, look up the AM, and get # of columns too */
    if (OidIsValid(index_oid))
    {
        HeapTuple    tuple;
        Form_pg_class rd_rel;

        Assert(!OidIsValid(amoid));
        tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(index_oid));
        if (!HeapTupleIsValid(tuple))
            PG_RETURN_NULL();
        rd_rel = (Form_pg_class) GETSTRUCT(tuple);
		if (rd_rel->relkind != RELKIND_INDEX &&
			rd_rel->relkind != RELKIND_PARTITIONED_INDEX)
        {
            ReleaseSysCache(tuple);
            PG_RETURN_NULL();
        }
        amoid = rd_rel->relam;
        natts = rd_rel->relnatts;
        ReleaseSysCache(tuple);
    }

    /*
     * At this point, either index_oid == InvalidOid or it's a valid index
     * OID.  Also, after this test, either attno == 0 for index-wide or
     * AM-wide tests, or it's a valid column number in a valid index.
     */
    if (attno < 0 || attno > natts)
        PG_RETURN_NULL();

    /*
     * Get AM information.  If we don't have a valid AM OID, return NULL.
     */
    routine = GetIndexAmRoutineByAmId(amoid, true);
    if (routine == NULL)
        PG_RETURN_NULL();

    /*
     * If there's an AM property routine, give it a chance to override the
     * generic logic.  Proceed if it returns false.
     */
    if (routine->amproperty &&
        routine->amproperty(index_oid, attno, prop, propname,
                            &res, &isnull))
    {
        if (isnull)
            PG_RETURN_NULL();
        PG_RETURN_BOOL(res);
    }

    if (attno > 0)
    {
        /* Handle column-level properties */
        switch (prop)
        {
            case AMPROP_ASC:
                if (test_indoption(index_oid, attno, routine->amcanorder,
                                   INDOPTION_DESC, 0, &res))
                    PG_RETURN_BOOL(res);
                PG_RETURN_NULL();

            case AMPROP_DESC:
                if (test_indoption(index_oid, attno, routine->amcanorder,
                                   INDOPTION_DESC, INDOPTION_DESC, &res))
                    PG_RETURN_BOOL(res);
                PG_RETURN_NULL();

            case AMPROP_NULLS_FIRST:
                if (test_indoption(index_oid, attno, routine->amcanorder,
                                   INDOPTION_NULLS_FIRST, INDOPTION_NULLS_FIRST, &res))
                    PG_RETURN_BOOL(res);
                PG_RETURN_NULL();

            case AMPROP_NULLS_LAST:
                if (test_indoption(index_oid, attno, routine->amcanorder,
                                   INDOPTION_NULLS_FIRST, 0, &res))
                    PG_RETURN_BOOL(res);
                PG_RETURN_NULL();

            case AMPROP_ORDERABLE:
                PG_RETURN_BOOL(routine->amcanorder);

            case AMPROP_DISTANCE_ORDERABLE:

                /*
                 * The conditions for whether a column is distance-orderable
                 * are really up to the AM (at time of writing, only GiST
                 * supports it at all).  The planner has its own idea based on
                 * whether it finds an operator with amoppurpose 'o', but
                 * getting there from just the index column type seems like a
                 * lot of work.  So instead we expect the AM to handle this in
                 * its amproperty routine.  The generic result is to return
                 * false if the AM says it never supports this, and null
                 * otherwise (meaning we don't know).
                 */
                if (!routine->amcanorderbyop)
                    PG_RETURN_BOOL(false);
                PG_RETURN_NULL();

            case AMPROP_RETURNABLE:
                if (!routine->amcanreturn)
                    PG_RETURN_BOOL(false);

                /*
                 * If possible, the AM should handle this test in its
                 * amproperty function without opening the rel.  But this is
                 * the generic fallback if it does not.
                 */
                {
                    Relation    indexrel = index_open(index_oid, AccessShareLock);

                    res = index_can_return(indexrel, attno);
                    index_close(indexrel, AccessShareLock);
                }

                PG_RETURN_BOOL(res);

            case AMPROP_SEARCH_ARRAY:
                PG_RETURN_BOOL(routine->amsearcharray);

            case AMPROP_SEARCH_NULLS:
                PG_RETURN_BOOL(routine->amsearchnulls);

            default:
                PG_RETURN_NULL();
        }
    }

    if (OidIsValid(index_oid))
    {
        /*
         * Handle index-level properties.  Currently, these only depend on the
         * AM, but that might not be true forever, so we make users name an
         * index not just an AM.
         */
        switch (prop)
        {
            case AMPROP_CLUSTERABLE:
                PG_RETURN_BOOL(routine->amclusterable);

            case AMPROP_INDEX_SCAN:
                PG_RETURN_BOOL(routine->amgettuple ? true : false);

            case AMPROP_BITMAP_SCAN:
                PG_RETURN_BOOL(routine->amgetbitmap ? true : false);

            case AMPROP_BACKWARD_SCAN:
                PG_RETURN_BOOL(routine->amcanbackward);

            default:
                PG_RETURN_NULL();
        }
    }

    /*
     * Handle AM-level properties (those that control what you can say in
     * CREATE INDEX).
     */
    switch (prop)
    {
        case AMPROP_CAN_ORDER:
            PG_RETURN_BOOL(routine->amcanorder);

        case AMPROP_CAN_UNIQUE:
            PG_RETURN_BOOL(routine->amcanunique);

        case AMPROP_CAN_MULTI_COL:
            PG_RETURN_BOOL(routine->amcanmulticol);

        case AMPROP_CAN_EXCLUDE:
            PG_RETURN_BOOL(routine->amgettuple ? true : false);

        default:
            PG_RETURN_NULL();
    }
}

/*
 * Test property of an AM specified by AM OID
 */
Datum
pg_indexam_has_property(PG_FUNCTION_ARGS)
{
    Oid            amoid = PG_GETARG_OID(0);
    char       *propname = text_to_cstring(PG_GETARG_TEXT_PP(1));

    return indexam_property(fcinfo, propname, amoid, InvalidOid, 0);
}

/*
 * Test property of an index specified by index OID
 */
Datum
pg_index_has_property(PG_FUNCTION_ARGS)
{
    Oid            relid = PG_GETARG_OID(0);
    char       *propname = text_to_cstring(PG_GETARG_TEXT_PP(1));

    return indexam_property(fcinfo, propname, InvalidOid, relid, 0);
}

/*
 * Test property of an index column specified by index OID and column number
 */
Datum
pg_index_column_has_property(PG_FUNCTION_ARGS)
{
    Oid            relid = PG_GETARG_OID(0);
    int32        attno = PG_GETARG_INT32(1);
    char       *propname = text_to_cstring(PG_GETARG_TEXT_PP(2));

    /* Reject attno 0 immediately, so that attno > 0 identifies this case */
    if (attno <= 0)
        PG_RETURN_NULL();

    return indexam_property(fcinfo, propname, InvalidOid, relid, attno);
}
